import threading

import redis


class RedisQueue(object):

    def __init__(self, redis_config):

        self.redis_client = redis.StrictRedis(decode_responses=True, **redis_config)
        # self.group = group
        # self.topic_list = topic_list if topic_list is not None else []
        # self.clean = clean

    def consumer(self, group, topic_list, clean):
        """
        消费者
        :return:
        """
        if clean:
            self.clean_queue(topic_list)
        self.create_groups(group, topic_list)

    def send_msg(self, topic, data):
        """
        发送消息
        :param topic:
        :param data:
        :return:
        """
        self.redis_client.xadd(topic, data)

    def queue_len(self, topic):
        """
        获取队列长度
        :param topic:
        :return:
        """
        return self.redis_client.execute_command('XLEN', topic)

    def destroy_group(self, topic, group):
        """
        销毁一个消费组
        :param topic:
        :param group:
        :return:
        """
        return self.redis_client.xgroup_destroy(topic, group)

    def clean_queue(self, topic_list):
        """
        清空队列，非必要执行
        :return:
        """
        for topic in topic_list:
            try:
                self.redis_client.delete(topic)
            except:
                pass

    def create_groups(self, group, topic_list):
        """
        创建订阅组
        :return:
        """
        for i in range(0, len(topic_list)):
            try:
                # 创建订阅组
                self.redis_client.xgroup_create(topic_list[i], group, id=str(i), mkstream=True)
            except:
                pass

    def streams(self, topic_list):
        """
        生成流字典
        :return:
        """
        return {key: '>' for key in topic_list}

    def loop_listen(self, process_fun, group, topic_list, clean):
        """
        监听消息
        :param process_fun: 定义的处理函数需要接受三个参数订阅的主题、消息内容、消息ID
        :return:
        """
        print('try start redis consumer')
        self.consumer(group, topic_list, clean)
        streams = self.streams(topic_list)
        while True:
            messages = self.redis_client.xreadgroup(group, group, streams, block=0, count=1)
            for stream, message_list in messages:
                for message_id, message_data in message_list:
                    process_fun(stream, message_data, message_id)
                    # print(message_data)
                    self.redis_client.xdel(stream, message_id)


